use std::collections::HashMap;
use std::fs::{File,OpenOptions};
use std::io::{BufReader, BufWriter, Read, Write};
use std::sync::mpsc;
use std::thread;
use encoding_rs::GBK;
use memmap2::{Mmap, MmapMut};
use std::sync::Arc;
use rayon::prelude::*;
use rayon::slice::ParallelSlice;
use std::collections::HashSet;
use chrono::{Local, Datelike};
use anyhow::{Result, Context};
use anyhow::anyhow;
use std::fs;
#[derive(Clone)]
pub struct DbfHeader {
pub record_count: u32,
pub header_len: u16,
pub record_len: u16,
pub fields: Vec<FieldDescriptor>,
pub file_type: u8,
pub year: u8,
pub month: u8,
pub day: u8,
pub reserved: [u8; 20], }
#[derive(Clone)]
#[derive(Debug)]
pub struct FieldDescriptor {
pub name: String,
pub field_type: char,
pub length: u8,
pub decimal: u8,
}
#[derive(Debug, Clone)]
struct FieldDescriptor1 {
name: String,
field_type: u8, length: u8,
decimal: u8,
}
fn parse_record(data: &[u8], fields: &[FieldDescriptor]) -> Result<HashMap<String, String>> {
let mut record = HashMap::new();
let mut offset = 1;
for field in fields {
let end = offset + field.length as usize;
if end > data.len() {
break;
}
let field_data = &data[offset..end];
let (value, _, _) = GBK.decode(field_data);
let value = value.trim().to_string();
record.insert(field.name.clone(), value);
offset = end;
}
Ok(record)
}
pub fn read_dbf_concurrent(path: &str) -> Result<Vec<HashMap<String, String>>> {
let file = File::open(path).context("Failed to open DBF file")?;
let mmap = unsafe { Mmap::map(&file).context("Failed to map DBF file into memory")? };
let header = parse_header(&mmap)?;
let num_threads = num_cpus::get().min(header.record_count as usize / 1000 + 1);
let records_per_thread = (header.record_count as usize + num_threads - 1) / num_threads;
let mmap = Arc::new(mmap); let (tx, rx) = mpsc::channel();
let record_size = header.record_len as usize;
let base_offset = header.header_len as usize;
for i in 0..num_threads {
let tx = tx.clone();
let mmap = Arc::clone(&mmap);
let fields = header.fields.clone();
thread::spawn(move || {
let start_idx = i * records_per_thread;
let end_idx = (start_idx + records_per_thread).min(header.record_count as usize);
let start_offset = base_offset + start_idx * record_size;
let end_offset = base_offset + end_idx * record_size;
let chunk = &mmap[start_offset..end_offset];
let mut records = Vec::with_capacity(end_idx - start_idx);
for (idx, record_data) in chunk.chunks(record_size).enumerate() {
if record_data[0] == 0x2A { continue; } let _global_idx = start_idx + idx;
match parse_record(record_data, &fields) {
Ok(record) => records.push(record),
Err(e) => {
eprintln!("Error parsing record: {:?}", e);
}
}
}
tx.send(records).unwrap();
});
}
drop(tx); let mut all_records = Vec::with_capacity(header.record_count as usize);
for received in rx {
all_records.extend(received);
}
Ok(all_records)
}
fn parse_header(data: &[u8]) -> Result<DbfHeader> {
if data.len() < 32 {
return Err(anyhow::anyhow!("Incomplete header data").into());
}
let header_data = &data[0..32];
let record_count = u32::from_le_bytes(header_data[4..8].try_into()?);
let header_len = u16::from_le_bytes(header_data[8..10].try_into()?);
let record_len = u16::from_le_bytes(header_data[10..12].try_into()?);
let reserved = header_data[12..32].try_into()?;
let fields_start = 32; let fields_end = header_len as usize;
if data.len() < fields_end {
return Err(anyhow::anyhow!("Field descriptors exceed data bounds").into());
}
let fields = parse_field_descriptors(&data[fields_start..fields_end])?;
Ok(DbfHeader {
record_count,
header_len,
record_len,
fields,
reserved, file_type: header_data[0], year: header_data[1], month: header_data[2], day: header_data[3], })
}
fn parse_field_descriptors(data: &[u8]) -> Result<Vec<FieldDescriptor>> {
let mut fields = Vec::new();
let mut cursor = 0;
while cursor + 32 <= data.len() {
let chunk = &data[cursor..cursor+32];
let name = String::from_utf8_lossy(&chunk[0..11])
.trim_end_matches('\0')
.to_string();
let field_type = char::from(chunk[11]);
let length = chunk[16]; let decimal = chunk[17];
fields.push(FieldDescriptor {
name,
field_type,
length,
decimal, });
cursor += 32;
if cursor < data.len() && data[cursor] == 0x0D {
break;
}
}
Ok(fields)
}
pub fn get_dbf_fields(path: &str) -> Result<Vec<FieldDescriptor>> {
let file = File::open(path).context("Failed to open DBF file")?;
let mmap = unsafe { Mmap::map(&file).context("Failed to map DBF file into memory")? };
let header = parse_header(&mmap)?;
Ok(header.fields)
}
pub fn find_records(
path: &str,
query: &HashMap<String, String>,
) -> Result<Vec<HashMap<String, String>>> {
let file = File::open(path).context("Failed to open DBF file")?;
let mmap = unsafe { Mmap::map(&file).context("Failed to map DBF file into memory")? };
let header = parse_header(&mmap)?;
let num_threads = num_cpus::get().min(header.record_count as usize / 1000 + 1);
let records_per_thread = (header.record_count as usize + num_threads - 1) / num_threads;
let mmap = Arc::new(mmap); let (tx, rx) = mpsc::channel();
let record_size = header.record_len as usize;
let base_offset = header.header_len as usize;
let fields = Arc::new(header.fields);
for i in 0..num_threads {
let tx = tx.clone();
let mmap = Arc::clone(&mmap);
let fields = Arc::clone(&fields);
let query = Arc::new(query.clone());
thread::spawn(move || {
let start_idx = i * records_per_thread;
let end_idx = (start_idx + records_per_thread).min(header.record_count as usize);
let start_offset = base_offset + start_idx * record_size;
let end_offset = base_offset + end_idx * record_size;
let chunk = &mmap[start_offset..end_offset];
let mut records = Vec::with_capacity(end_idx - start_idx);
for (idx, record_data) in chunk.chunks(record_size).enumerate() {
if record_data[0] == 0x2A { continue; }
match parse_record(record_data, &fields) {
Ok(record) => {
if record_matches_query(&record, &query) {
records.push(record);
}
}
Err(e) => {
eprintln!("Error parsing record: {:?}", e);
}
}
}
tx.send(records).unwrap();
});
}
drop(tx); let mut all_records = Vec::new();
for received in rx {
all_records.extend(received);
}
Ok(all_records)
}
fn record_matches_query(
record: &HashMap<String, String>,
query: &HashMap<String, String>,
) -> bool {
query.iter().all(|(key, value)| {
record.get(key).map_or(false, |record_value| record_value == value)
})
}
pub fn write_dbf(path: &str, records: &[HashMap<String, String>], fields: &[FieldDescriptor]) -> Result<(), std::io::Error> {
let file = File::create(path)?;
let mut writer = BufWriter::new(file);
let today = Local::now();
let header_len = 32 + fields.len() * 32 + 1;
let record_len = fields.iter().map(|f| f.length as usize).sum::<usize>() + 1;
let mut header = vec![0u8; 32];
header[0] = 0x03; header[1] = (today.year() - 1900) as u8; header[2] = today.month() as u8; header[3] = today.day() as u8; header[4..8].copy_from_slice(&(records.len() as u32).to_le_bytes());
header[8..10].copy_from_slice(&(header_len as u16).to_le_bytes());
header[10..12].copy_from_slice(&(record_len as u16).to_le_bytes());
writer.write_all(&header)?;
for field in fields {
let mut field_data = [0u8; 32];
let name_bytes = field.name.as_bytes();
let name_len = name_bytes.len().min(11); field_data[0..name_len].copy_from_slice(&name_bytes[0..name_len]);
field_data[11] = field.field_type as u8; field_data[16] = field.length; field_data[17] = field.decimal; writer.write_all(&field_data)?;
}
writer.write_all(&[0x0D])?;
let mut buffer = Vec::with_capacity(record_len);
for record in records {
buffer.clear();
buffer.push(0x20);
for field in fields {
let value = record.get(&field.name).map(|s| s.as_str()).unwrap_or(""); let encoded = if field.field_type == 'C' {
let (encoded, _, _) = GBK.encode(value);
encoded.to_vec()
} else {
value.as_bytes().to_vec()
};
let mut padded = vec![0x20; field.length as usize]; let copy_len = encoded.len().min(field.length as usize);
match field.field_type {
'C' => {
padded[..copy_len].copy_from_slice(&encoded[..copy_len]); },
_ => {
let start = field.length as usize - copy_len; padded[start..].copy_from_slice(&encoded[..copy_len]);
}
}
buffer.extend(&padded); }
writer.write_all(&buffer)?;
}
writer.flush()?;
Ok(())
}
pub fn update_records(path: &str, query: HashMap<String, String>, updates: HashMap<String, String>) -> Result<usize, std::io::Error> {
let mut all_records = read_dbf_concurrent(path)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; let mut updated_count = 0;
for record in all_records.iter_mut() {
let mut match_query = true;
for (key, value) in &query {
if let Some(record_value) = record.get(key) {
if record_value != value {
match_query = false;
break;
}
} else {
match_query = false;
break;
}
}
if match_query {
for (key, value) in &updates {
if let Some(record_value) = record.get_mut(key) {
*record_value = value.clone();
}
}
updated_count += 1;
}
}
println!("更新后的所有记录:");
for record in &all_records {
for (key, value) in record {
println!("{} -> {}", key, value);
}
println!("------");
}
let fields = get_dbf_fields(path)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
write_dbf(path, &all_records, &fields)?;
Ok(updated_count)
}
pub fn add_record(path: &str, mut new_record: HashMap<String, String>) -> Result<(), std::io::Error> {
let mut all_records = read_dbf_concurrent(path)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
let fields = get_dbf_fields(path)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
for field in &fields {
if !new_record.contains_key(&field.name) {
new_record.insert(field.name.clone(), String::new());
}
}
for field in &fields {
if let Some(value) = new_record.get(&field.name) {
match field.field_type {
'C' => {
if value.len() > field.length as usize {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Field {} exceeds maximum length of {}", field.name, field.length)));
}
}
'N' => {
if value.parse::<f64>().is_err() {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Field {} must be a valid number", field.name)));
}
}
_ => {}
}
}
}
all_records.push(new_record.clone());
println!("新增记录:");
for (key, value) in &new_record {
println!("{} -> {}", key, value);
}
write_dbf(path, &all_records, &fields)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
Ok(())
}
pub fn delete_records(path: &str, query: &HashMap<String, String>) -> Result<usize> {
let mut all_records = read_dbf_concurrent(path)
.context("Failed to read DBF records")?;
let original_count = all_records.len();
all_records.retain(|record| {
for (key, value) in query {
if let Some(record_value) = record.get(key) {
if record_value != value {
return true;
}
} else {
return true;
}
}
false
});
let deleted_count = original_count - all_records.len();
let fields = get_dbf_fields(path)
.context("Failed to get DBF fields")?;
write_dbf(path, &all_records, &fields)
.context("Failed to write updated DBF file")?;
Ok(deleted_count)
}
pub fn get_record_count(path: &str) -> Result<u32> {
let file = File::open(path)
.context(format!("Failed to open file: {}", path))?;
let mmap = unsafe { Mmap::map(&file) }
.context("Failed to map file to memory")?;
if mmap.len() < 8 { return Err(anyhow::anyhow!("文件太小无法读取有效头部").into());
}
let record_count = u32::from_le_bytes(mmap[4..8].try_into()?);
Ok(record_count)
}
pub fn read_dbf_paged(
path: &str,
page: usize,
page_size: usize,
) -> Result<Vec<HashMap<String, String>>> {
let file = File::open(path)
.context(format!("Failed to open file: {}", path))?;
let mmap = Arc::new(unsafe { Mmap::map(&file) }
.context("Failed to map file to memory")?);
let header = parse_header(&mmap)
.context("Failed to parse DBF header")?;
let total_records = header.record_count as usize;
let start_idx = (page - 1).saturating_mul(page_size);
let end_idx = (start_idx + page_size).min(total_records);
if start_idx >= total_records {
return Ok(Vec::new());
}
let record_size = header.record_len as usize;
let data_start = header.header_len as usize;
let start_offset = data_start + start_idx * record_size;
let end_offset = data_start + end_idx * record_size;
let records_data = &mmap[start_offset..end_offset.min(mmap.len())];
let records: Result<Vec<_>, _> = records_data
.par_chunks(record_size)
.filter(|chunk| chunk[0] != 0x2A)
.map(|chunk| parse_record(chunk, &header.fields))
.collect();
records.map_err(|e| anyhow::anyhow!("Error parsing records: {}", e).into())
}
pub fn read_dbf_paged_query(
path: &str,
query: &HashMap<String, String>,
page: usize,
page_size: usize,
) -> Result<Vec<HashMap<String, String>>> {
let file = File::open(path).context("Failed to open DBF file")?;
let mmap = unsafe { Mmap::map(&file) }.context("Failed to memory-map DBF file")?;
let header = parse_header(&mmap).context("Failed to parse DBF header")?;
let num_threads = num_cpus::get().min(header.record_count as usize / 1000 + 1);
let records_per_thread = (header.record_count as usize + num_threads - 1) / num_threads;
let mmap = Arc::new(mmap);
let query = Arc::new(query.clone());
let (tx, rx) = mpsc::channel();
let record_size = header.record_len as usize;
let base_offset = header.header_len as usize;
for i in 0..num_threads {
let tx = tx.clone();
let mmap = Arc::clone(&mmap);
let fields = header.fields.clone();
let query = Arc::clone(&query);
thread::spawn(move || {
let start_idx = i * records_per_thread;
let end_idx = (start_idx + records_per_thread).min(header.record_count as usize);
let start_offset = base_offset + start_idx * record_size;
let end_offset = base_offset + end_idx * record_size;
let chunk = &mmap[start_offset..end_offset];
let mut records = Vec::with_capacity(end_idx - start_idx);
for record_data in chunk.chunks(record_size) {
if record_data[0] == 0x2A {
continue;
}
match parse_record(record_data, &fields) {
Ok(record) => {
if matches_query(&record, &query) {
records.push(record);
}
}
Err(e) => {
eprintln!("Error parsing record: {}", e);
}
}
}
tx.send(records).unwrap();
});
}
drop(tx);
let mut all_records = Vec::new();
for received in rx {
all_records.extend(received);
}
let start = (page - 1) * page_size;
let end = start + page_size;
let total = all_records.len();
Ok(if start >= total {
Vec::new()
} else {
all_records[start..end.min(total)].to_vec()
})
}
fn matches_query(record: &HashMap<String, String>, query: &HashMap<String, String>) -> bool {
query.iter()
.all(|(k, v)| record.get(k).map_or(false, |val| val == v))
}
pub fn add_fields_to_dbf(path: &str, add_fields: Vec<FieldDescriptor>) -> Result<()> {
let file_data = {
let file = File::open(path).context("Failed to open DBF file")?;
let mut buf = Vec::new();
BufReader::new(file).read_to_end(&mut buf).context("Failed to read DBF file")?;
buf
};
let header = parse_header(&file_data).context("Failed to parse DBF header")?;
if header.record_len == 0 {
return Err(anyhow!("Original record length cannot be zero"));
}
let old_record_len = header.record_len as usize;
let data_start = header.header_len as usize;
let expected_data_len = header.record_count as usize * old_record_len;
if file_data.len() < data_start + expected_data_len {
return Err(anyhow!("File data is incomplete or corrupted"));
}
let mut fields = header.fields.clone();
let existing_names: HashSet<_> = fields.iter().map(|f| &f.name).collect();
let new_fields = add_fields.into_iter()
.filter(|f| !existing_names.contains(&f.name))
.collect::<Vec<_>>();
fields.extend(new_fields);
let updated_header_len = 32 + (fields.len() * 32) as u16 + 1; let updated_record_len = fields.iter().map(|f| f.length as usize).sum::<usize>();
if updated_record_len < old_record_len {
return Err(anyhow!("New record length is shorter than the original record length"));
}
let mut new_data = Vec::with_capacity(header.record_count as usize * updated_record_len);
for chunk in file_data[data_start..].chunks(old_record_len) {
let mut record = chunk.to_vec();
if record.len() < old_record_len {
let padding_len = old_record_len - record.len();
record.extend(vec![b' '; padding_len]);
}
let padding_len = updated_record_len - old_record_len;
record.extend(vec![b' '; padding_len]);
new_data.extend_from_slice(&record);
}
let mut new_file = Vec::new();
new_file.push(header.file_type);
new_file.push(header.year);
new_file.push(header.month);
new_file.push(header.day);
new_file.extend_from_slice(&header.record_count.to_le_bytes());
new_file.extend_from_slice(&updated_header_len.to_le_bytes());
new_file.extend_from_slice(&(updated_record_len as u16).to_le_bytes());
new_file.extend_from_slice(&header.reserved);
for field in &fields {
let mut desc = [0; 32];
let name_bytes = field.name.as_bytes();
let len = name_bytes.len().min(11);
desc[..len].copy_from_slice(&name_bytes[..len]);
desc[11] = field.field_type as u8;
desc[16] = field.length;
desc[17] = field.decimal;
new_file.extend_from_slice(&desc);
}
new_file.push(0x0D);
new_file.extend_from_slice(&new_data);
fs::write(path, new_file).context("Failed to write updated DBF file")?;
Ok(())
}
pub fn modify_fields_in_dbf(path: &str, modify_fields: Vec<FieldDescriptor>) -> Result<()> {
let file = OpenOptions::new()
.read(true)
.write(true)
.open(path)
.context(format!("Failed to open file with write access: {}", path))?;
let mut mmap_mut = unsafe { MmapMut::map_mut(&file).context("Failed to map file to memory")? };
let mut header = parse_header(&mmap_mut).context("Failed to parse DBF header")?;
let mut fields = header.fields.clone();
for modify_field in modify_fields {
if let Some(field) = fields.iter_mut().find(|f| f.name == modify_field.name) {
*field = modify_field;
}
}
let fields1: Vec<FieldDescriptor1> = fields
.iter()
.map(|field| FieldDescriptor1 {
name: field.name.clone(),
field_type: field.field_type as u8, length: field.length,
decimal: field.decimal,
})
.collect();
let updated_header_len = 32 + fields1.len() as u16 * 32 + 1; let updated_record_len = fields1.iter().map(|f| f.length as u16).sum::<u16>();
header.header_len = updated_header_len;
header.record_len = updated_record_len;
let total_records = header.record_count as usize;
let old_record_size = header.record_len as usize;
let new_record_size = updated_record_len as usize;
let data_start = header.header_len as usize;
let required_file_size = data_start + total_records * new_record_size + 1;
if required_file_size > mmap_mut.len() {
println!(
"Resizing file: old_size={}, new_size={}",
mmap_mut.len(),
required_file_size
);
file.set_len(required_file_size as u64)
.context("Failed to resize file")?;
mmap_mut = unsafe { MmapMut::map_mut(&file).context("Failed to re-map file")? };
}
let header_data = &mut mmap_mut[0..32];
header_data[0] = header.file_type;
header_data[1] = header.year;
header_data[2] = header.month;
header_data[3] = header.day;
header_data[4..8].copy_from_slice(&header.record_count.to_le_bytes());
header_data[8..10].copy_from_slice(&header.header_len.to_le_bytes());
header_data[10..12].copy_from_slice(&header.record_len.to_le_bytes());
header_data[12..32].copy_from_slice(&header.reserved);
let terminator_pos = 32 + fields1.len() * 32;
if terminator_pos >= mmap_mut.len() {
return Err(anyhow!("Terminator position out of bounds"));
}
mmap_mut[terminator_pos] = 0x0D;
let fields_start = 32;
for (i, field) in fields1.iter().enumerate() {
let field_data_start = fields_start + i * 32;
let field_data = &mut mmap_mut[field_data_start..field_data_start + 32];
let (name_bytes, _, _) = GBK.encode(&field.name);
let name_len = name_bytes.len().min(11); field_data[0..name_len].copy_from_slice(&name_bytes[..name_len]);
field_data[name_len] = 0;
field_data[11] = field.field_type; field_data[16] = field.length;
field_data[17] = field.decimal;
for j in 12..32 {
if j != 16 && j != 17 {
field_data[j] = 0;
}
}
}
let original_fields = fields1.clone(); for record_idx in 0..total_records {
let old_offset = data_start + record_idx * old_record_size;
let new_offset = data_start + record_idx * new_record_size;
if old_offset >= mmap_mut.len() || new_offset >= mmap_mut.len() {
return Err(anyhow!("Record offset out of bounds"));
}
mmap_mut[new_offset] = mmap_mut[old_offset];
let mut pos_old = 1;
let mut pos_new = 1;
for (i, new_field) in fields1.iter().enumerate() {
let old_field = &original_fields[i]; let old_data_range = pos_old..(pos_old + old_field.length as usize);
let new_data_range = pos_new..(pos_new + new_field.length as usize);
if old_offset + old_data_range.end > mmap_mut.len()
|| new_offset + new_data_range.end > mmap_mut.len()
{
return Err(anyhow!("Field data out of bounds"));
}
let old_data: Vec<u8> = mmap_mut[old_offset + old_data_range.start..old_offset + old_data_range.end]
.to_vec();
let mut new_data_buffer = vec![b' '; new_field.length as usize];
match new_field.field_type {
b'C' => process_string_field(&old_data, &mut new_data_buffer, old_field, new_field),
b'N' => process_numeric_field(&old_data, &mut new_data_buffer, old_field, new_field),
b'D' => process_date_field(&old_data, &mut new_data_buffer),
b'L' => process_logical_field(&old_data, &mut new_data_buffer),
_ => new_data_buffer.copy_from_slice(&old_data),
}
let new_data_start = new_offset + new_data_range.start;
let new_data_end = new_offset + new_data_range.end;
mmap_mut[new_data_start..new_data_end].copy_from_slice(&new_data_buffer);
pos_old += old_field.length as usize;
pos_new += new_field.length as usize;
}
if new_record_size > pos_new {
if new_offset + new_record_size > mmap_mut.len() {
return Err(anyhow!(
"New record size out of bounds: new_offset={}, new_record_size={}, mmap_len={}",
new_offset, new_record_size, mmap_mut.len()
));
}
mmap_mut[new_offset + pos_new..new_offset + new_record_size].fill(b' ');
}
}
if required_file_size > 0 {
mmap_mut[required_file_size - 1] = 0x1A;
}
mmap_mut.flush().context("Failed to flush memory map changes to file")?;
file.sync_all().context("Failed to sync file changes")?;
println!("Changes saved to file.");
Ok(())
}
fn process_string_field(
old_data: &[u8],
new_data: &mut [u8],
old_field: &FieldDescriptor1,
new_field: &FieldDescriptor1,
) {
let (decoded, _, _) = GBK.decode(old_data);
let decoded_str = decoded.trim_end().to_string();
let (encoded, _, _) = GBK.encode(&decoded_str);
let write_len = encoded.len().min(new_data.len());
new_data[..write_len].copy_from_slice(&encoded[..write_len]);
new_data[write_len..].fill(b' ');
}
fn process_numeric_field(
old_data: &[u8],
new_data: &mut [u8],
old_field: &FieldDescriptor1,
new_field: &FieldDescriptor1,
) {
let copy_len = old_data.len().min(new_data.len());
new_data[..copy_len].copy_from_slice(&old_data[..copy_len]);
if new_data.len() > copy_len {
let padding = new_data.len() - copy_len;
new_data.copy_within(..copy_len, padding);
new_data[..padding].fill(b' ');
}
}
fn process_date_field(old_data: &[u8], new_data: &mut [u8]) {
let copy_len = old_data.len().min(new_data.len());
new_data[..copy_len].copy_from_slice(&old_data[..copy_len]);
}
fn process_logical_field(old_data: &[u8], new_data: &mut [u8]) {
if !old_data.is_empty() {
new_data[0] = old_data[0];
}
}
pub fn delete_fields_from_dbf(
path: &str,
delete_field_names: Vec<String>,
) -> Result<()> {
let file = OpenOptions::new()
.read(true)
.write(true)
.open(path)
.context("Failed to open file")?;
let mut mmap = unsafe { MmapMut::map_mut(&file).context("Failed to map file")? };
let header = {
let header = parse_header(&mmap)?;
header };
let mut fields = header.fields.clone();
fields.retain(|field| !delete_field_names.contains(&field.name));
let updated_header_len = 32 + fields.len() as u16 * 32;
let updated_record_len = fields.iter().map(|f| f.length as u16).sum::<u16>();
let updated_header = DbfHeader {
record_count: header.record_count,
header_len: updated_header_len,
record_len: updated_record_len,
fields,
file_type: header.file_type,
year: header.year,
month: header.month,
day: header.day,
reserved: header.reserved,
};
let header_data = &mut mmap[0..32];
header_data[0] = updated_header.file_type;
header_data[1] = updated_header.year;
header_data[2] = updated_header.month;
header_data[3] = updated_header.day;
header_data[4..8].copy_from_slice(&updated_header.record_count.to_le_bytes());
header_data[8..10].copy_from_slice(&updated_header.header_len.to_le_bytes());
header_data[10..12].copy_from_slice(&updated_header.record_len.to_le_bytes());
header_data[12..32].copy_from_slice(&updated_header.reserved);
let fields_start = 32;
for (i, field) in updated_header.fields.iter().enumerate() {
let field_data_start = fields_start + i * 32;
let field_data = &mut mmap[field_data_start..field_data_start + 32];
let name_len = field.name.len().min(11);
field_data[0..name_len].copy_from_slice(field.name.as_bytes());
field_data[name_len] = 0;
field_data[11] = field.field_type as u8;
field_data[16] = field.length;
field_data[17] = field.decimal;
for j in 12..32 {
if j != 16 && j != 17 {
field_data[j] = 0;
}
}
}
let old_fields_end = fields_start + header.fields.len() * 32;
let new_fields_end = fields_start + updated_header.fields.len() * 32;
if new_fields_end < old_fields_end {
mmap[new_fields_end..old_fields_end].fill(0);
}
let data_start = header.header_len as usize;
let record_size = header.record_len as usize;
let updated_record_size = updated_header.record_len as usize;
let total_records = header.record_count as usize;
for record_idx in 0..total_records {
let old_offset = data_start + record_idx * record_size;
let new_offset = data_start + record_idx * updated_record_size;
let mut pos_old = 1; let mut pos_new = 1;
for field in &updated_header.fields {
let old_field = header.fields.iter().find(|f| f.name == field.name).unwrap();
let old_end = pos_old + old_field.length as usize;
let new_end = pos_new + field.length as usize;
let mut temp_buffer = vec![0u8; (old_end - pos_old)];
temp_buffer.copy_from_slice(&mmap[old_offset + pos_old..old_offset + old_end]);
let new_data = &mut mmap[new_offset + pos_new..new_offset + new_end];
new_data.copy_from_slice(&temp_buffer);
pos_old = old_end;
pos_new = new_end;
}
if updated_record_size > pos_new {
mmap[new_offset + pos_new..new_offset + updated_record_size].fill(b' ');
}
}
mmap.flush().context("Failed to flush memory map")?;
file.sync_all().context("Failed to sync file")?;
Ok(())
}
fn is_record_matching(record: &HashMap<String, String>, query: &HashMap<String, String>) -> bool {
query.iter().all(|(field_name, value)| {
match record.get(field_name) {
Some(field_value) => field_value == value,
None => false,
}
})
}
pub fn get_filtered_record_count(path: &str, query: &HashMap<String, String>) -> Result<u32> {
let file = File::open(path).context("Failed to open DBF file")?;
let mmap = unsafe { Mmap::map(&file).context("Failed to map DBF file into memory")? };
let header = parse_header(&mmap)?;
let num_threads = num_cpus::get().min(header.record_count as usize / 1000 + 1);
let records_per_thread = (header.record_count as usize + num_threads - 1) / num_threads;
let mmap = Arc::new(mmap); let base_offset = header.header_len as usize;
let record_size = header.record_len as usize;
let fields = Arc::new(header.fields.clone());
let filtered_count = (0..num_threads).into_par_iter().map(|i| {
let start_idx = i * records_per_thread;
let end_idx = (start_idx + records_per_thread).min(header.record_count as usize);
let start_offset = base_offset + start_idx * record_size;
let end_offset = base_offset + end_idx * record_size;
let chunk = &mmap[start_offset..end_offset];
let mut local_count = 0;
for record_data in chunk.chunks(record_size) {
if record_data[0] == 0x2A {
continue;
}
match parse_record(record_data, &fields) {
Ok(record) => {
if is_record_matching(&record, query) {
local_count += 1;
}
}
Err(e) => {
eprintln!("Error parsing record: {:?}", e);
}
}
}
local_count }).sum::<u32>();
Ok(filtered_count)
}